/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */
package org.unidata.mdm.dq.core.service.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.jgrapht.alg.CycleDetector;
import org.jgrapht.alg.cycle.SzwarcfiterLauerSimpleCycles;
import org.jgrapht.graph.DirectedMultigraph;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.context.ModelSourceContext;
import org.unidata.mdm.core.exception.UPathException;
import org.unidata.mdm.core.service.CustomPropertiesSupport;
import org.unidata.mdm.core.service.UPathService;
import org.unidata.mdm.core.type.upath.UPath;
import org.unidata.mdm.core.type.upath.UPathElement;
import org.unidata.mdm.dq.core.service.impl.instance.AbstractCleanseFunctionImpl;
import org.unidata.mdm.dq.core.service.impl.instance.CompositeCleanseFunctionImpl;
import org.unidata.mdm.dq.core.service.impl.instance.GroovyCleanseFunctionImpl;
import org.unidata.mdm.dq.core.service.impl.instance.JavaCleanseFunctionImpl;
import org.unidata.mdm.dq.core.service.impl.instance.PythonCleanseFunctionImpl;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortInputType;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionType;
import org.unidata.mdm.dq.core.type.model.instance.PortElement;
import org.unidata.mdm.dq.core.type.model.source.AbstractCleanseFunctionSource;
import org.unidata.mdm.dq.core.type.model.source.CleanseFunctionGroup;
import org.unidata.mdm.dq.core.type.model.source.CompositeCleanseFunctionNode;
import org.unidata.mdm.dq.core.type.model.source.CompositeCleanseFunctionSource;
import org.unidata.mdm.dq.core.type.model.source.CompositeCleanseFunctionTransition;
import org.unidata.mdm.dq.core.type.model.source.DataQualityModel;
import org.unidata.mdm.dq.core.type.model.source.GroovyCleanseFunctionSource;
import org.unidata.mdm.dq.core.type.model.source.JavaCleanseFunctionSource;
import org.unidata.mdm.dq.core.type.model.source.PythonCleanseFunctionSource;
import org.unidata.mdm.dq.core.type.model.source.assignment.NameSpaceAssignmentSource;
import org.unidata.mdm.dq.core.type.model.source.constant.CleanseFunctionConstant;
import org.unidata.mdm.dq.core.type.model.source.rule.InputPortMapping;
import org.unidata.mdm.dq.core.type.model.source.rule.MappingSetSource;
import org.unidata.mdm.dq.core.type.model.source.rule.OutputPortMapping;
import org.unidata.mdm.dq.core.type.model.source.rule.QualityRuleSource;
import org.unidata.mdm.dq.core.type.model.source.rule.RuleMappingSource;
import org.unidata.mdm.dq.core.type.model.source.rule.ValidationSettings;
import org.unidata.mdm.dq.core.util.DQUtils;
import org.unidata.mdm.system.exception.ValidationResult;
import org.unidata.mdm.system.util.NameSpaceUtils;
import org.unidata.mdm.system.util.TextUtils;

/**
 * @author Mikhail Mikhailov on Feb 5, 2021
 * DQ validations.
 */
@Component
public class DataQualityValidationComponent implements CustomPropertiesSupport {
    /**
     * Default validation pattern for names in model.
     */
    public static final Pattern DEFAULT_MODEL_NAME_PATTERN = Pattern.compile("^[a-z][a-z0-9_-]*$", Pattern.CASE_INSENSITIVE);
    /**
     * PRECHECK: Message codes.
     */
    private static final String PRECHECK_COMPOSITE_CYCLES_DETECTED = "app.dq.meta.composite.cycles.detected";
    /**
     * The Constant VALIDATE_RULE_FUNCTION_ABSENT.
     */
    private static final String VALIDATE_RULE_FUNCTION_ABSENT = "app.meta.dq.rule.function.not.found";
    /**
     * The Constant VALIDATE_RULE_IS_USELESS.
     */
    private static final String VALIDATE_RULE_IS_USELESS = "app.meta.dq.rule.useless";
    /**
     * The Constant VALIDATE_RULE_INVALID_VALIDATION_SETTINGS.
     */
    private static final String VALIDATE_RULE_INVALID_VALIDATION_SETTINGS = "app.meta.dq.rule.validation.invalid";
    /**
     * The Constant VALIDATE_RULE_INVALID_SELECTION_SETTINGS.
     */
    private static final String VALIDATE_RULE_INVALID_SELECTION_SETTINGS = "app.meta.dq.rule.selection.invalid";
    /**
     * The Constant VALIDATE_MAPPING_RULE_NOT_FOUND.
     */
    private static final String VALIDATE_MAPPING_RULE_NOT_FOUND = "app.meta.dq.mapping.rule.not.found";
    /**
     * The Constant VALIDATE_MAPPING_ROOT_PATH_INVALID.
     */
    private static final String VALIDATE_MAPPING_ROOT_PATH_INVALID = "app.meta.dq.mapping.root.path.invalid";
    /**
     * The Constant VALIDATE_MAPPING_ROOT_PATH_FAILED.
     */
    private static final String VALIDATE_MAPPING_ROOT_PATH_FAILED = "app.meta.dq.mapping.root.path.failed";
    /**
     * The Constant VALIDATE_MAPPING_REQUIRED_PORT_MISSING.
     */
    private static final String VALIDATE_MAPPING_REQUIRED_PORT_MISSING = "app.meta.dq.mapping.required.port.missing";
    /**
     * The Constant VALIDATE_MAPPING_UPATH_AND_CONSTANT_DEFINED.
     */
    private static final String VALIDATE_MAPPING_UPATH_AND_CONSTANT_DEFINED = "app.meta.dq.mapping.upath.and.constant.defined";
    /**
     * The Constant VALIDATE_MAPPING_CONSTANT_IS_EMPTY.
     */
    private static final String VALIDATE_MAPPING_CONSTANT_IS_EMPTY = "app.meta.dq.mapping.constant.is.empty";
    /**
     * The Constant VALIDATE_MAPPING_CONSTANT_INPUT_TYPE_NOT_SUPPORTED.
     */
    private static final String VALIDATE_MAPPING_CONSTANT_INPUT_TYPE_NOT_SUPPORTED = "app.meta.dq.mapping.constant.input.type.not.supported";
    /**
     * The Constant VALIDATE_MAPPING_CONSTANT_VALUE_TYPE_NOT_SUPPORTED.
     */
    private static final String VALIDATE_MAPPING_CONSTANT_VALUE_TYPE_NOT_SUPPORTED = "app.meta.dq.mapping.constant.value.type.not.supported";
    /**
     * The Constant VALIDATE_MAPPING_INVALID_INCOMING_SUBPATH.
     */
    private static final String VALIDATE_MAPPING_INVALID_INCOMING_SUBPATH = "app.meta.dq.mapping.invalid.incoming.subpath";
    /**
     * The Constant VALIDATE_MAPPING_INVALID_OUTGOING_SUBPATH.
     */
    private static final String VALIDATE_MAPPING_INVALID_OUTGOING_SUBPATH = "app.meta.dq.mapping.invalid.outgoing.subpath";
    /**
     * The Constant VALIDATE_MAPPING_INVALID_INCOMING_LOCAL.
     */
    private static final String VALIDATE_MAPPING_INVALID_INCOMING_LOCAL = "app.meta.dq.mapping.invalid.outgoing.local";
    /**
     * The Constant VALIDATE_MAPPING_INVALID_OUTGOING_LOCAL.
     */
    private static final String VALIDATE_MAPPING_INVALID_OUTGOING_LOCAL = "app.meta.dq.mapping.invalid.outgoing.local";
    /**
     * The Constant VALIDATE_ASSIGNMENTS_NAMESPACE_NOT_FOUND.
     */
    private static final String VALIDATE_ASSIGNMENTS_NAMESPACE_NOT_FOUND = "app.meta.dq.assignment.namespace.not.found";
    /**
     * The Constant VALIDATE_ASSIGNMENTS_SETS_NOT_FOUND.
     */
    private static final String VALIDATE_ASSIGNMENTS_SETS_NOT_FOUND = "app.meta.dq.assignment.sets.not.found";
    /**
     * @author Mikhail Mikhailov on Apr 6, 2021
     * Objects, that can be checked in similar fashion.
     */
    private enum NamedValidatedObject {
        /**
         * Functions.
         */
        FUNCTION(
                "app.meta.dq.function.name.empty",
                "app.meta.dq.function.name.invalid",
                "app.meta.dq.function.name.too.long",
                "app.meta.dq.function.duplicate.names"),
        /**
         * Quality rules.
         */
        RULE(
                "app.meta.dq.rule.name.empty",
                "app.meta.dq.rule.name.invalid",
                "app.meta.dq.rule.name.too.long",
                "app.meta.dq.rule.duplicate.names"),
        /**
         * Rule mapping sets.
         */
        SET(
                "app.meta.dq.set.name.empty",
                "app.meta.dq.set.name.invalid",
                "app.meta.dq.set.name.too.long",
                "app.meta.dq.set.duplicate.names"),

        /**
         * Rule mapping sets.
         */
        GROUP(
                "app.meta.dq.group.name.empty",
                "app.meta.dq.group.name.invalid",
                "app.meta.dq.group.name.too.long",
                "app.meta.dq.group.duplicate.names");

        private NamedValidatedObject(String nameEmpty, String nameInvalid, String nameTooLong, String nameDuplicate) {
            this.nameDuplicate = nameDuplicate;
            this.nameEmpty = nameEmpty;
            this.nameInvalid = nameInvalid;
            this.nameTooLong = nameTooLong;
        }

        private final String nameEmpty;

        private final String nameInvalid;

        private final String nameTooLong;

        private final String nameDuplicate;

        /**
         * @return the nameEmpty
         */
        public String getNameEmpty() {
            return nameEmpty;
        }

        /**
         * @return the nameInvalid
         */
        public String getNameInvalid() {
            return nameInvalid;
        }

        /**
         * @return the nameTooLong
         */
        public String getNameTooLong() {
            return nameTooLong;
        }

        /**
         * @return the nameDuplicate
         */
        public String getNameDuplicate() {
            return nameDuplicate;
        }

        public String getObjectTypeTitle() {

            switch (this) {
            case FUNCTION:
                return "Cleanse function";
            case RULE:
                return "Rule";
            case SET:
                return "Rule mapping set";
            case GROUP:
                return "Function group";
            }

            return StringUtils.EMPTY;
        }
    }
    /**
     * The CFCC.
     */
    @Autowired
    private CleanseFunctionCacheComponent cleanseFunctionCacheComponent;
    /**
     * UPath service.
     */
    @Autowired
    private UPathService upathService;
    /**
     * Constructor.
     */
    public DataQualityValidationComponent() {
        super();
    }

    public Collection<ValidationResult> allow(ModelSourceContext<?> change) {
        return Collections.emptyList();
    }

    public Collection<ValidationResult> precheck(DataQualityModel model) {

        Collection<ValidationResult> validations = new ArrayList<>();

        model.getCompositeFunctions().forEach(cf -> validations.addAll(precheckCompositeCycles(cf)));

        return validations;
    }

    private Collection<ValidationResult> precheckCompositeCycles(CompositeCleanseFunctionSource ccfs) {

        DirectedMultigraph<CompositeCleanseFunctionNode, CompositeCleanseFunctionTransition> graph
            = DQUtils.ofSource(ccfs);

        CycleDetector<CompositeCleanseFunctionNode, CompositeCleanseFunctionTransition> detector
            = new CycleDetector<>(graph);

        // Does graph which represents composite cleanse function contains any cycles?
        if (!detector.detectCycles()) {
            Collections.emptyList();
        }

        // If found at least one cycle, all of them should be detected.
        SzwarcfiterLauerSimpleCycles<CompositeCleanseFunctionNode, CompositeCleanseFunctionTransition> finder
            = new SzwarcfiterLauerSimpleCycles<>();

        finder.setGraph(graph);

        List<List<CompositeCleanseFunctionNode>> cycles = finder.findSimpleCycles();
        if (CollectionUtils.isNotEmpty(cycles)) {

            List<ValidationResult> messages = new ArrayList<>();
            for (List<CompositeCleanseFunctionNode> paths : cycles) {

                Set<CompositeCleanseFunctionTransition> path = new LinkedHashSet<>();
                for (int i = 0; i < paths.size() - 1; i++) {

                    Set<CompositeCleanseFunctionTransition> tr1 = graph.getAllEdges(paths.get(i), paths.get(i + 1));
                    Set<CompositeCleanseFunctionTransition> tr2 = graph.getAllEdges(paths.get(i + 1), paths.get(i));

                    path.addAll(tr1);
                    path.addAll(tr2);
                }

                messages.add(new ValidationResult("Cycles in composite function [{}] detected. This is not allowed. Paths in question {}",
                        PRECHECK_COMPOSITE_CYCLES_DETECTED, path.toString()));
            }

            return messages;
        }

        return Collections.emptyList();
    }

    public Collection<ValidationResult> validate(DataQualityModel model) {

        List<ValidationResult> errors = new ArrayList<>();
        Map<String, AbstractCleanseFunctionImpl<?>> fc = new HashMap<>();

        // 1. Functions
        validateFunctions(errors, model, fc);

        // 2. Rules
        validateRules(errors, model, fc);

        // 3. Sets
        validateSets(errors, model, fc);

        // 4. Groups
        validateGroups(errors, model);

        // 5. Validate set assignments
        validateAssignments(errors, model);

        return errors;
    }

    private void validateFunctions(List<ValidationResult> errors,
            DataQualityModel model,
            @Nonnull Map<String, AbstractCleanseFunctionImpl<?>> fc) {

        List<AbstractCleanseFunctionSource<?>> allFunctions = getAllFunctions(model);
        if (CollectionUtils.isEmpty(allFunctions)) {
            return;
        }

        // 1. Validate basic stuff - element names conformance, custom properties, name unqueness, etc.
        errors.addAll(validateNames(allFunctions.stream()
                .map(AbstractCleanseFunctionSource::getName)
                .collect(Collectors.toList()), NamedValidatedObject.FUNCTION));

        for (AbstractCleanseFunctionSource<?> acfs : allFunctions) {

            if (StringUtils.isBlank(acfs.getName())) {
                continue;
            }

            // 1.1. Custom props
            errors.addAll(validateCustomProperties(acfs.getName(), acfs.getCustomProperties()));

            // 1.2. Check operation ability.
            AbstractCleanseFunctionImpl<?> acfi = getFunctionImpl(model, acfs);

            if (Objects.nonNull(acfi)) {

                if (!acfi.isReady()) {
                    errors.add(new ValidationResult("Cleanse function [{}] is not ready for operation.", acfi.getName()));
                }

                if (StringUtils.isNotBlank(acfi.getNote())) {
                    errors.add(new ValidationResult("Cleanse function [{}] has additional failure notes [{}].",
                            acfi.getName(), acfi.getNote()));
                }

                fc.put(acfi.getName(), acfi);
            } else {
                errors.add(new ValidationResult("Cleanse function [{}] can not be instantiated.", acfs.getName()));
            }
        }
    }

    private void validateRules(List<ValidationResult> errors,
            DataQualityModel model,
            @Nonnull Map<String, AbstractCleanseFunctionImpl<?>> fc) {

        List<QualityRuleSource> allRules = model.getRules();
        if (CollectionUtils.isEmpty(allRules)) {
            return;
        }

        // 1. Basic stuff
        errors.addAll(validateNames(allRules.stream()
                .map(QualityRuleSource::getName)
                .collect(Collectors.toList()), NamedValidatedObject.RULE));

        for (QualityRuleSource rule : allRules) {
            validateRule(errors, fc, rule);
        }
    }

    private void validateRule(List<ValidationResult> errors,
            @Nonnull Map<String, AbstractCleanseFunctionImpl<?>> fc,
            QualityRuleSource rule) {

        // 1.1. Custom props.
        errors.addAll(validateCustomProperties(rule.getName(), rule.getCustomProperties()));

        // 1.2. Function reference
        if (!fc.containsKey(rule.getCleanseFunctionName())) {
            errors.add(new ValidationResult("Cleanse Function [{}] from rule definition [{}] not found.",
                    VALIDATE_RULE_FUNCTION_ABSENT, rule.getCleanseFunctionName(), rule.getName()));
        }

        // 1.3. Settings
        if (Objects.isNull(rule.getValidationSettings()) && Objects.isNull(rule.getEnrichmentSettings())) {
            errors.add(new ValidationResult("Quality rule [{}] does nothing (Neither validation nor enrichment settings defined).",
                    VALIDATE_RULE_IS_USELESS, rule.getName()));
        }

        ValidationSettings vs = rule.getValidationSettings();
        if (Objects.nonNull(vs)) {

            boolean messageUndefined = StringUtils.isBlank(vs.getMessageText()) && StringUtils.isBlank(vs.getMessagePort());
            boolean categoryUndefined = StringUtils.isBlank(vs.getCategoryText()) && StringUtils.isBlank(vs.getCategoryPort());
            boolean severityUndefined = Objects.isNull(vs.getSeverityIndicator()) && StringUtils.isBlank(vs.getSeverityPort());
            boolean raiseUndefined = StringUtils.isBlank(vs.getRaisePort());

            if (messageUndefined || categoryUndefined || severityUndefined || raiseUndefined) {
                errors.add(new ValidationResult("Quality rule [{}] has incorrect validation settings.",
                        VALIDATE_RULE_INVALID_VALIDATION_SETTINGS, rule.getName()));
            }
        }

        // 1.4. Selection
        if (Objects.isNull(rule.getSelectionSettings())
        || (!rule.getSelectionSettings().isAll() && rule.getSelectionSettings().getSourceSystems().isEmpty())) {
            errors.add(new ValidationResult("Quality rule [{}] has incorrect selection settings.",
                    VALIDATE_RULE_INVALID_SELECTION_SETTINGS, rule.getName()));
        }
    }

    private void validateSets(List<ValidationResult> errors,
            DataQualityModel model,
            @Nonnull Map<String, AbstractCleanseFunctionImpl<?>> fc) {

        List<MappingSetSource> allSets = model.getSets();
        if (CollectionUtils.isEmpty(allSets)) {
            return;
        }

        // 3. Sets
        errors.addAll(validateNames(allSets.stream()
                .map(MappingSetSource::getName)
                .collect(Collectors.toList()), NamedValidatedObject.SET));

        for (MappingSetSource mss : allSets) {

            for (RuleMappingSource rms : mss.getMappings()) {

                // A non existing rule referenced.
                if (model.getRules().stream().noneMatch(r -> StringUtils.equals(r.getName(), rms.getRuleName()))) {

                    errors.add(new ValidationResult("Quality rule mapping in set [{}] references a non-existing rule [{}].",
                            VALIDATE_MAPPING_RULE_NOT_FOUND, mss.getName(), rms.getRuleName()));

                    continue;
                }

                UPath root = validatePath(errors, mss, rms, rms.getLocalPath(), false);
                validateInputMapping(errors, model, mss, rms, fc, root);
                validateOutputMapping(errors, model, mss, rms, fc, root);
            }
        }
    }

    private void validateInputMapping(List<ValidationResult> errors, DataQualityModel model, MappingSetSource mss, RuleMappingSource rms,
            @Nonnull Map<String, AbstractCleanseFunctionImpl<?>> fc, UPath root) {

        String functionName = model.getRules().stream()
            .filter(r -> StringUtils.equals(r.getName(), rms.getRuleName()))
            .map(QualityRuleSource::getCleanseFunctionName)
            .findFirst()
            .orElse(null);

        AbstractCleanseFunctionImpl<?> acfi = fc.get(functionName);

        // 'Function null', 'rule, referencing nothing' were already reported.
        // So just return.
        if (Objects.isNull(acfi)) {
            return;
        }

        for (PortElement pel : acfi.getInputPorts()) {

            InputPortMapping ipm = rms.getInputMappings().stream()
                .filter(i -> StringUtils.equals(pel.getName(), i.getPortName()))
                .findFirst()
                .orElse(null);

            if (ipm == null) {

                if (pel.isRequired()) {
                    errors.add(new ValidationResult(
                            "Rule mapping in set [{}] for rule [{}] does not contain mapping for required input port [{}], defined in function [{}].",
                            VALIDATE_MAPPING_REQUIRED_PORT_MISSING, mss.getName(), rms.getRuleName(), pel.getName(), functionName));
                }

                continue;
            }

            // UN-7624
            if (StringUtils.isNotBlank(ipm.getInputPath()) && Objects.nonNull(ipm.getConstantValue())) {
                errors.add(new ValidationResult(
                        "Rule mapping in set [{}] for rule [{}] contains both UPath element and a constant value in input port [{}], what is not allowed.",
                        VALIDATE_MAPPING_UPATH_AND_CONSTANT_DEFINED, mss.getName(), rms.getRuleName(), pel.getName()));
            }

            if (StringUtils.isNotBlank(ipm.getInputPath())) {
                UPath port = validatePath(errors, mss, rms, ipm.getInputPath(), true);
                validateSubpath(errors, mss, rms, root, port, pel, true);
            }

            if (Objects.nonNull(ipm.getConstantValue())) {
                validateConstant(errors, mss, rms, pel, ipm.getConstantValue());
            }
        }
    }

    private void validateOutputMapping(List<ValidationResult> errors, DataQualityModel model, MappingSetSource mss, RuleMappingSource rms,
            @Nonnull Map<String, AbstractCleanseFunctionImpl<?>> fc, UPath root) {

        String functionName = model.getRules().stream()
                .filter(r -> StringUtils.equals(r.getName(), rms.getRuleName()))
                .map(QualityRuleSource::getCleanseFunctionName)
                .findFirst()
                .orElse(null);

        AbstractCleanseFunctionImpl<?> acfi = fc.get(functionName);

        // 'Function null', 'rule, referencing nothing' were already reported.
        // So just return.
        if (Objects.isNull(acfi)) {
            return;
        }

        for (PortElement pel : acfi.getOutputPorts()) {

            OutputPortMapping opm = rms.getOutputMappings().stream()
                .filter(i -> StringUtils.equals(pel.getName(), i.getPortName()))
                .findFirst()
                .orElse(null);

            if (opm == null) {

                if (pel.isRequired()) {
                    errors.add(new ValidationResult(
                            "Rule mapping in set [{}] for rule [{}] does not contain mapping for required output port [{}], defined in function [{}].",
                            VALIDATE_MAPPING_REQUIRED_PORT_MISSING, mss.getName(), rms.getRuleName(), pel.getName(), functionName));
                }

                continue;
            }

            // UN-7624
            if (StringUtils.isNotBlank(opm.getOutputPath()) && Objects.nonNull(opm.getConstantValue())) {
                errors.add(new ValidationResult(
                        "Rule mapping in set [{}] for rule [{}] contains both UPath element and a constant value in output port [{}], what is not allowed.",
                        VALIDATE_MAPPING_UPATH_AND_CONSTANT_DEFINED, mss.getName(), rms.getRuleName(), pel.getName()));
            }

            if (StringUtils.isNotBlank(opm.getOutputPath())) {
                UPath port = validatePath(errors, mss, rms, opm.getOutputPath(), true);
                validateSubpath(errors, mss, rms, root, port, pel, false);
            }

            if (Objects.nonNull(opm.getConstantValue())) {
                validateConstant(errors, mss, rms, pel, opm.getConstantValue());
            }
        }
    }

    private UPath validatePath(List<ValidationResult> errors, MappingSetSource mss, RuleMappingSource rms, String path, boolean isPort) {

        UPath upath = null;
        if (Objects.nonNull(path)) {

            try {

                upath = upathService.upathCreate(path);
                String upathResult = upath.toUPath();
                boolean isValid = StringUtils.equals(StringUtils.trim(path), StringUtils.trim(upathResult));
                if (!isValid) {
                    errors.add(new ValidationResult("Rule mapping for rule [{}] in mapping set [{}] contains invalid UPath expression [{}], which differs from parsed result [{}].",
                            VALIDATE_MAPPING_ROOT_PATH_INVALID, rms.getRuleName(), mss.getName(), path, upathResult));
                } else {

                    if (isPort && !upath.getTail().isCollecting()) {
                        errors.add(new ValidationResult("Rule mapping for rule [{}] in mapping set [{}] contains invalid UPath expression [{}]. "
                                + "Last element of the port mapping must be a collecting element.",
                                VALIDATE_MAPPING_ROOT_PATH_INVALID, rms.getRuleName(), mss.getName(), path));
                    } else {
                        return upath;
                    }
                }

            } catch (UPathException e) {
                errors.add(new ValidationResult(
                        "Rule mapping for rule [{}] in mapping set [{}] contains UPath expression, which didn't pass validation. Exception caught [{}].",
                        VALIDATE_MAPPING_ROOT_PATH_FAILED, rms.getRuleName(), mss.getName(), TextUtils.getText(e.getId().code(), e.getArgs())));
            }
        }

        return null;
    }

    private void validateConstant(List<ValidationResult> errors, MappingSetSource mss, RuleMappingSource rms,
            PortElement pel, CleanseFunctionConstant cfc) {

        if (cfc.isEmpty() && pel.isRequired()) {
            errors.add(new ValidationResult("Rule mapping for rule [{}] in mapping set [{}] defines a constant with empty value for required port [{}].",
                    VALIDATE_MAPPING_CONSTANT_IS_EMPTY, rms.getRuleName(), mss.getName(), pel.getName()));

            return;
        }

        if ((cfc.isArray() && !pel.getInputTypes().contains(CleanseFunctionPortInputType.ARRAY))
         || (cfc.isSingle() && !pel.getInputTypes().contains(CleanseFunctionPortInputType.SIMPLE))) {
            errors.add(new ValidationResult("Rule mapping for rule [{}] in mapping set [{}] defines a constant for port [{}] of INPUT type, "
                    + "which is not supported by the executing cleanse function.",
                    VALIDATE_MAPPING_CONSTANT_INPUT_TYPE_NOT_SUPPORTED, rms.getRuleName(), mss.getName(), pel.getName()));
        }

        if ((cfc.isArray() && !pel.getValueTypes().contains(cfc.getArray().getType().toFunctionPortValueType()))
         || (cfc.isSingle() && !pel.getValueTypes().contains(cfc.getSingle().getType().toFunctionPortValueType()))) {
            errors.add(new ValidationResult("Rule mapping for rule [{}] in mapping set [{}] defines a constant for port [{}] of VALUE type, "
                    + "which is not supported by the executing cleanse function.",
                    VALIDATE_MAPPING_CONSTANT_VALUE_TYPE_NOT_SUPPORTED, rms.getRuleName(), mss.getName(), pel.getName()));
        }
    }

    private void validateSubpath(List<ValidationResult> errors, MappingSetSource mss, RuleMappingSource rms,
            UPath rootUpath, UPath portUpath, PortElement pel, boolean isIncoming) {

        if (Objects.isNull(rootUpath) || Objects.isNull(portUpath) || rootUpath.isRoot()) {
            return;
        }

        if ((portUpath.getNumberOfSegments() - rootUpath.getNumberOfSegments()) > 1) {

            if (isIncoming) {
                errors.add(new ValidationResult(
                        "Rule mapping for rule [{}] in mapping set [{}] defines UPath expression in incoming port [{}], "
                        + "which is not a valid LOCAL sub path of the rule execution context. It is longer than parent path plus one element.",
                        VALIDATE_MAPPING_INVALID_INCOMING_LOCAL,
                        rms.getRuleName(), mss.getName(), pel.getName()));
            } else {
                errors.add(new ValidationResult(
                        "Rule mapping for rule [{}] in mapping set [{}] defines UPath expression in outgoing port [{}], "
                        + "which is not a valid LOCAL sub path of the rule execution context. It is longer than parent path plus one element.",
                        VALIDATE_MAPPING_INVALID_OUTGOING_LOCAL,
                        rms.getRuleName(), mss.getName(), pel.getName()));
            }

            return;
        }

        List<UPathElement> rootElements = rootUpath.getElements();
        List<UPathElement> portElements = portUpath.getElements();
        for (int i = 0; i < rootElements.size(); i++) {

            UPathElement portEl = portElements.get(i);
            UPathElement rootEl = rootElements.get(i);

            if (!StringUtils.equals(rootEl.getElement(), portEl.getElement())) {

                if (isIncoming) {
                    errors.add(new ValidationResult(
                            "Rule mapping for rule [{}] in mapping set [{}] defines UPath expression in incoming port [{}], "
                            + "which is not a valid sub path of the mapping's root path ([{}] element).",
                            VALIDATE_MAPPING_INVALID_INCOMING_SUBPATH,
                            rms.getRuleName(), mss.getName(), pel.getName(), portEl.getElement()));
                } else {
                    errors.add(new ValidationResult(
                            "Rule mapping for rule [{}] in mapping set [{}] defines UPath expression in outgoing port [{}], "
                            + "which is not a valid sub path of the mapping's root path ([{}] element).",
                            VALIDATE_MAPPING_INVALID_OUTGOING_SUBPATH,
                            rms.getRuleName(), mss.getName(), pel.getName(), portEl.getElement()));
                }

                break;
            }
        }
    }

    private void validateGroups(List<ValidationResult> errors, DataQualityModel model) {

        Map<String, CleanseFunctionGroup> allGroups = getAllGroups(model.getFunctionGroup(), StringUtils.EMPTY, new HashMap<>());
        if (MapUtils.isEmpty(allGroups)) {
            return;
        }

        // 3. Sets
        errors.addAll(validateNames(allGroups.values().stream()
                .map(CleanseFunctionGroup::getName)
                .collect(Collectors.toList()), NamedValidatedObject.GROUP));
    }

    private void validateAssignments(List<ValidationResult> errors, DataQualityModel model) {

        List<NameSpaceAssignmentSource> assignments = model.getAssignments();
        if (CollectionUtils.isEmpty(assignments)) {
            return;
        }

        // 1. Basic stuff
        Map<String, CleanseFunctionGroup> groups = getAllGroups(model.getFunctionGroup(), StringUtils.EMPTY, new HashMap<>());
        errors.addAll(validateNames(groups.values().stream()
                .map(CleanseFunctionGroup::getName)
                .collect(Collectors.toList()), NamedValidatedObject.GROUP));

        // 2. Check namespaces for existance
        errors.addAll(assignments.stream()
                .map(NameSpaceAssignmentSource::getNameSpace)
                .filter(ns -> NameSpaceUtils.find(ns) == null)
                .map(ns -> new ValidationResult("Assignment incorrect. Namespace [{}] not found.", VALIDATE_ASSIGNMENTS_NAMESPACE_NOT_FOUND, ns))
                .collect(Collectors.toList()));

        // 3. Find mapping sets, missing in the model
        errors.addAll(assignments.stream()
                .map(ns ->
                    ns.getAssignments().stream()
                        .flatMap(sa -> sa.getSets().stream())
                        .distinct()
                        .filter(sn -> model.getSets().stream().noneMatch(s -> StringUtils.equals(s.getName(), sn)))
                        .collect(Collectors.toList()))
                .filter(CollectionUtils::isNotEmpty)
                .map(ns -> new ValidationResult("Assignment incorrect. Mapping sets [{}] not found.", VALIDATE_ASSIGNMENTS_SETS_NOT_FOUND, ns))
                .collect(Collectors.toList()));
    }

    private Collection<ValidationResult> validateNames(Collection<String> names, NamedValidatedObject nvo) {

        Map<String, Integer> cardinality = new HashMap<>();
        List<ValidationResult> errors = new ArrayList<>();
        for (String en : names) {

            // 1. Name
            if (StringUtils.isBlank(en)) {
                String message = nvo.getObjectTypeTitle() + " name is blank. Model object names must not be blank.";
                errors.add(new ValidationResult(message, nvo.getNameEmpty(), nvo.getObjectTypeTitle()));
            } else {

                // 2. Shape
                if (!DEFAULT_MODEL_NAME_PATTERN.matcher(en).matches()) {
                    String message = nvo.getObjectTypeTitle() + " name [{}] is invalid. The name contains invalid characters.";
                    errors.add(new ValidationResult(message, nvo.getNameInvalid(), en));
                }

                // 3. Name length
                if(en.length() > 255) {
                    String message = nvo.getObjectTypeTitle() + " name too long. Name [{}], max length [{}]";
                    errors.add(new ValidationResult(message, nvo.getNameTooLong(), en, 255));
                }

                // 4. Uniqueness
                cardinality.compute(en, (k, v) -> v == null ? Integer.valueOf(1) : Integer.valueOf(v.intValue() + 1));
            }
        }

        Set<String> duplicates = cardinality.entrySet().stream()
                .filter(entry -> entry.getValue() > 1)
                .map(Entry::getKey)
                .collect(Collectors.toSet());

        if (CollectionUtils.isNotEmpty(duplicates)) {
            String message = "Duplicate " + nvo.getObjectTypeTitle() + " name(s) [{}]";
            errors.add(new ValidationResult(message, nvo.getNameDuplicate(), duplicates.stream().collect(Collectors.joining(", "))));
        }

        return errors;
    }

    private AbstractCleanseFunctionImpl<?> getFunctionImpl(DataQualityModel model, AbstractCleanseFunctionSource<?> acfs) {

        AbstractCleanseFunctionImpl<?> acfi = null;
        if (acfs.getType() == CleanseFunctionType.JAVA) {

            JavaCleanseFunctionImpl jcfi = new JavaCleanseFunctionImpl((JavaCleanseFunctionSource) acfs);
            jcfi.implement(model.getStorageId(), cleanseFunctionCacheComponent);

            acfi = jcfi;
        } else if (acfs.getType() == CleanseFunctionType.COMPOSITE) {

            CompositeCleanseFunctionImpl ccfi = new CompositeCleanseFunctionImpl(null, (CompositeCleanseFunctionSource) acfs);
            ccfi.implement();

            acfi = ccfi;
        } else if (acfs.getType() == CleanseFunctionType.GROOVY) {

            GroovyCleanseFunctionImpl gcfi = new GroovyCleanseFunctionImpl((GroovyCleanseFunctionSource) acfs);
            gcfi.implement(model.getStorageId(), cleanseFunctionCacheComponent);

            acfi = gcfi;
        } else if (acfs.getType() == CleanseFunctionType.PYTHON) {

            PythonCleanseFunctionImpl pcfi = new PythonCleanseFunctionImpl((PythonCleanseFunctionSource) acfs);
            pcfi.implement(model.getStorageId(), cleanseFunctionCacheComponent);

            acfi = pcfi;
        }

        return acfi;
    }

    private Map<String, CleanseFunctionGroup> getAllGroups(CleanseFunctionGroup parent, String prefix, Map<String, CleanseFunctionGroup> collector) {

        if (Objects.isNull(parent)) {
            return Collections.emptyMap();
        }

        String key = StringUtils.isBlank(prefix) ? parent.getName() : prefix + '.' + parent.getName();
        collector.put(key, parent);

        for (CleanseFunctionGroup each : parent.getGroups()) {
            getAllGroups(each, key, collector);
        }

        return collector;
    }

    private List<AbstractCleanseFunctionSource<?>> getAllFunctions(DataQualityModel model) {

        List<AbstractCleanseFunctionSource<?>> fns = new ArrayList<>(
                model.getCompositeFunctions().size() +
                model.getJavaFunctions().size() +
                model.getPythonFunctions().size() +
                model.getGroovyFunctions().size());

        fns.addAll(model.getCompositeFunctions());
        fns.addAll(model.getJavaFunctions());
        fns.addAll(model.getPythonFunctions());
        fns.addAll(model.getGroovyFunctions());

        return fns;
    }
}
